package com.dahuan.transform;

import com.dahuan.bean.SensorReading;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Transform_Reduce {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );

        //从文件读取数据
        String path = "E:\\Project\\FlinkTutorials\\Flink-Scala\\src\\main\\resources\\sensor.txt";
        DataStream<String> inputStream = env.readTextFile( path );

        //使用Java1.8的Lambda表达式将String类型转换为SensorReading类型
        DataStream<SensorReading> mapStream = inputStream.map( data -> {
            String[] splits = data.split( "," );
            return new SensorReading( splits[0], new Long( splits[1] ), new Double( splits[2] ) );
        } );

        //按照id分组
        KeyedStream<SensorReading, Tuple> keyedStream = mapStream.keyBy( "id" );

        //reduce聚合，取最大的温度值，以及当前的时间戳
/*        SingleOutputStreamOperator<SensorReading> reduce = keyedStream.reduce( new ReduceFunction<SensorReading>() {
            @Override
            public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {

                return new SensorReading(value1.getId(),value2.getTimestamp(),Math.max( value1.getTemperature(),value2.getTemperature() ));
            }
        } );*/


        //TODO  Lambda表达式
        SingleOutputStreamOperator<SensorReading> reduce = keyedStream.reduce( (curData, newData) -> {

            //curData对应id,newData对应时间戳,保持一个参数对应一个值,使得此值为最新值
             return new SensorReading(curData.getId(),newData.getTimestamp(),
                     //对比两个值的温度值,比较哪两个温度值最高
                     Math.max( curData.getTemperature(),newData.getTemperature() ));



        } );


        reduce.print();
        env.execute( "Transform_Reduce" );
    }
}
